package com.xl.competition.old.task1.f1

import org.apache.spark.sql.{Row, SparkSession}

import java.util.Properties

/**
 * @author: xl
 * @createTime: 2023/11/12 17:38:09
 * @program: com.xl.competition
 */
object DataLoadLineItem {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    //初始化sparkSession对象
    val sc: SparkSession = SparkSession
      .builder() //利用builder方法构建
      .master("local[*]") //本地模式运行下的线程资源  提交集群时不写
      .appName(this.getClass.getName) //应用名字  方便日志跟踪
      .enableHiveSupport() //支持hive  要写到hive表中
      .config("hive.metastore.uris", "thrift://master:9083") //hive的元数据地址 提交集群可不写但要有hive的环境变量
      .getOrCreate()
    //MYSQL的连接信息
    val properties = new Properties()
    properties.put("user", "root") //用户
    properties.put("password", "Abc123..") //密码

    sc
      .read
      .jdbc("jdbc:mysql://192.168.0.1:3306/competition", "lineitem", properties)
      .createTempView("tmp_lineitem") //创建临时视图
    //先在hive中建表 如果hive中已经存在可忽略
    sc.sql(
      """
        |create table if not exists ods.lineitem
        |(
        |    `linekey`       int,
        |    `orderkey`      int,
        |    `partkey`       int,
        |    `suppkey`       int,
        |    `linenumber`    int,
        |    `quantity`      double,
        |    `extendedprice` decimal,
        |    `discount`      double,
        |    `tax`           double,
        |    `returnflag`    string,
        |    `linestatus`    string,
        |    `shipdate`      string,
        |    `commitdate`    string,
        |    `receiptdate`   string,
        |    `shipinstruct`  string,
        |    `shipmode`      string
        |)
        |    PARTITIONED BY (`dt` STRING)
        |    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
        |        NULL DEFINED AS ''
        |    LOCATION '/warehouse/competition/ods/ods_lineitem/'
        |""".stripMargin)

    val row: Row = sc
      .sql("select max(orderkey) from ods.lineitem limit 1")
      .take(1)(0)
    var oldKey = 0
    if (!row.isNullAt(0)) {
      oldKey = row.getInt(0)
    }
    //执行插入语句  从临时视图中读取数据写入到ods.customer中
    sc.sql(
      s"""
         |insert into table ods.lineitem partition(dt='20231108')
         |select linekey,
         |       orderkey,
         |       partkey,
         |       suppkey,
         |       linenumber,
         |       quantity,
         |       extendedprice,
         |       discount,
         |       tax,
         |       returnflag,
         |       linestatus,
         |       cast(shipdate as string),
         |       cast(commitdate as string),
         |       cast(receiptdate as string),
         |       shipinstruct,
         |       shipmode
         |from tmp_lineitem
         |where orderkey > $oldKey
         |""".stripMargin)

    sc.stop()
  }
}
